Rxjava操作符:第4篇 Combining Observables

本文基于Rxjava 2.x版本,介绍用于操作多个 Observable 对象的操作符。

Operators that work with multiple source Observables to create a single Observable

  • And/Then/When — combine sets of items emitted by two or more Observables by means of Patternand Plan intermediaries
  • CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
  • Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable
  • Merge — combine multiple Observables into one by merging their emissions
  • StartWith — emit a specified sequence of items before beginning to emit the items from the source Observable
  • Switch — convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables
  • Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

startWith操作符

在 Observable 序列发送之前添加起始位置指定发送的 Observable

startWith Example

1
2
3
4
Observable<String> names = Observable.just("Spock", "McCoy");
names.startWith("Kirk").subscribe(item -> System.out.println(item));

// prints Kirk, Spock, McCoy
startWith 操作符

merge 操作符

merge 操作符

merge 操作符在用来合并多个 Observable 操作,合并的序列依次调用 Emitter#onNext() 方法执行,遇到错误序列将不继续执行。

merge Example

1
2
3
4
5
Observable.just(1, 2, 3)
.mergeWith(Observable.just(4, 5, 6))
.subscribe(item -> System.out.println(item));

// prints 1, 2, 3, 4, 5, 6
merge 操作符

mergeDelayError 操作符

和 merge 操作符类似,不过遇到错误会保存下来,等所有 Observable 执行完毕,再处理出现错误的Observable。注意 mergeDelayError 是静态泛型方法,只可以通过类名访问

mergeDelayError Example

1
2
3
4
5
6
7
Observable<String> observable1 = Observable.error(new IllegalArgumentException(""));
Observable<String> observable2 = Observable.just("Four", "Five", "Six");
Observable.mergeDelayError(observable1, observable2)
.subscribe(item -> System.out.println(item));

// emits 4, 5, 6 and then the IllegalArgumentException (in this specific
// example, this throws an `OnErrorNotImplementedException`).
mergeDelayError 操作符

zip 操作符

将多个 Observable 发射的数据合并起来,并生成一个Observable根据合并后的数据继续向下传递。onNext 方法只会调用一次。zipzipArrayzipIterablezipWith

zip Example

1
2
3
4
5
6
Observable<String> firstNames = Observable.just("James", "Jean-Luc", "Benjamin");
Observable<String> lastNames = Observable.just("Kirk", "Picard", "Sisko");
firstNames.zipWith(lastNames, (first, last) -> first + " " + last)
.subscribe(item -> System.out.println(item));

// prints James Kirk, Jean-Luc Picard, Benjamin Sisko
zipWith 操作符

zip 操作符 会将对应 Observable 对应的 Emitter onNext 的数据合并最后通过新生成的 Observable 发射调整后的数据。

combineLatest 操作符

让两个 Obserable 序列按照最新发出的数据组合在一起生成一个 Observable 序列向下传递。

combineLatest Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable<Long> newsRefreshes = Observable.interval(100, TimeUnit.MILLISECONDS);
Observable<Long> weatherRefreshes = Observable.interval(50, TimeUnit.MILLISECONDS);
Observable.combineLatest(newsRefreshes, weatherRefreshes,
(newsRefreshTimes, weatherRefreshTimes) ->
"Refreshed news " + newsRefreshTimes + " times and weather " + weatherRefreshTimes)
.subscribe(item -> System.out.println(item));

// prints:
// Refreshed news 0 times and weather 0
// Refreshed news 0 times and weather 1
// Refreshed news 0 times and weather 2
// Refreshed news 1 times and weather 2
// Refreshed news 1 times and weather 3
// Refreshed news 1 times and weather 4
// Refreshed news 2 times and weather 4
// Refreshed news 2 times and weather 5
// ...
combineLatest 操作符

switchOnNext 操作符

switchOnNext Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable<Observable<String>> timeIntervals =
Observable.interval(1, TimeUnit.SECONDS)
.map(ticks -> Observable.interval(100, TimeUnit.MILLISECONDS)
.map(innerInterval -> "outer: " + ticks + " - inner: " + innerInterval));
Observable.switchOnNext(timeIntervals)
.subscribe(item -> System.out.println(item));

// prints:
// outer: 0 - inner: 0
// outer: 0 - inner: 1
// outer: 0 - inner: 2
// outer: 0 - inner: 3
// outer: 0 - inner: 4
// outer: 0 - inner: 5
// outer: 0 - inner: 6
// outer: 0 - inner: 7
// outer: 0 - inner: 8
// outer: 1 - inner: 0
// outer: 1 - inner: 1
// outer: 1 - inner: 2
// outer: 1 - inner: 3
// ...
switchOnNext 操作符

参考文章:

https://github.com/ReactiveX/RxJava/wiki/Combining-Observables

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×